feat: add IngestTraces client for dedicated ingest service#501
feat: add IngestTraces client for dedicated ingest service#501
Conversation
| async def ingest_traces(self, traces_ingest_request: TracesIngestRequest) -> dict[str, Any]: | ||
| if self.experiment_id: | ||
| traces_ingest_request.experiment_id = UUID(self.experiment_id) | ||
| elif self.log_stream_id: | ||
| traces_ingest_request.log_stream_id = UUID(self.log_stream_id) |
There was a problem hiding this comment.
IngestTraces.ingest_traces repeats almost the same flow as ingest_spans: set the experiment/log stream IDs, flip on LoggingMethod.python_client, build the ingest URL, log the payload size, and post via an httpx.AsyncClient. The only real differences between the two methods are which route/attribute they use and the log message. Any future change to headers, timeout, metrics, or logging therefore needs to be made twice in blocks that are otherwise identical. Can we extract a shared helper (e.g. _post_to_ingest(endpoint: str, payload_attr: str, log_message: str, request: BaseModel)) that handles the ID injection, LoggingMethod assignment, URL construction, httpx request, and logging, and then let ingest_traces/ingest_spans simply call it with the different route and attribute? That keeps the helper consistent (shown below) and leaves the call sites tiny:
async def _post_to_ingest(...):
if self.experiment_id:
request.experiment_id = UUID(self.experiment_id)
elif self.log_stream_id:
request.log_stream_id = UUID(self.log_stream_id)
request.logging_method = LoggingMethod.python_client
url = f"{self._get_ingest_base_url()}{endpoint}"
_logger.info(log_message, extra={"url": url, "project_id": self.project_id, "num_items": len(getattr(request, payload_attr))})
async with httpx.AsyncClient(...) as client:
response = await client.post(url, json=request.model_dump(mode="json"), headers=self._get_auth_headers())
response.raise_for_status()
return response.json()
async def ingest_traces(...):
return await self._post_to_ingest(Routes.ingest_traces.format(...), "traces", "Sending traces to ingest service", traces_ingest_request)
async def ingest_spans(...):
return await self._post_to_ingest(Routes.ingest_spans.format(...), "spans", "Sending spans to ingest service", spans_ingest_request)
This avoids the same control flow being duplicated across the two methods.
Finding type: Code Dedup and Conventions | Severity: 🟢 Low
Want Baz to fix this for you? Activate Fixer
| _logger.info( | ||
| "Sending traces to ingest service", | ||
| extra={"url": url, "project_id": self.project_id, "num_traces": len(traces_ingest_request.traces)}, | ||
| ) | ||
|
|
There was a problem hiding this comment.
ingest_traces logs the "Sending traces to ingest service" start event but never logs success or failure after httpx.AsyncClient.post/response.raise_for_status. This violates AGENTS.md's "Logging & sensitive-data handling" lifecycle requirement and leaves ingest writes unobservable. Can we add a completion log (success or failure with non-sensitive context) immediately after the POST/response.raise_for_status?
Finding type: AI Coding Guidelines | Severity: 🟢 Low
Want Baz to fix this for you? Activate Fixer
Other fix methods
Prompt for AI Agents:
In src/galileo/traces.py around lines 227 to 231, the ingest_traces method only logs the
start of the request but never logs success or failure after the HTTP POST. Modify
ingest_traces to wrap the httpx post/response.raise_for_status sequence in a try/except:
after a successful response.raise_for_status() emit an _logger.info() completion log
with non-sensitive context (url, project_id, num_traces, response.status_code); on
exceptions (catch httpx.HTTPStatusError and a generic Exception) emit an _logger.error()
with safe context (url, project_id, num_traces, status code or exception message
trimmed) without logging request/response bodies or auth headers, then re-raise the
exception to preserve existing error behavior.
| _logger.info( | ||
| "Sending spans to ingest service", | ||
| extra={"url": url, "project_id": self.project_id, "num_spans": len(spans_ingest_request.spans)}, | ||
| ) | ||
|
|
There was a problem hiding this comment.
ingest_spans only logs 'Sending spans to ingest service' before the HTTP POST and never emits a completion log. AGENTS.md requires start+completion lifecycle logs so operators lack success/failure context; can we add a completion log after response.raise_for_status()/on success that doesn't include sensitive data?
Finding type: AI Coding Guidelines | Severity: 🟢 Low
Want Baz to fix this for you? Activate Fixer
Other fix methods
Prompt for AI Agents:
In src/galileo/traces.py around lines 252 to 256, the ingest_spans method only logs the
start of the HTTP request and never emits a completion/success log. Add a safe
completion log immediately after response.raise_for_status() and before returning
response.json(), e.g. call _logger.info("Finished sending spans to ingest service",
extra={"url": url, "project_id": self.project_id, "num_spans":
len(spans_ingest_request.spans), "status_code": response.status_code}). Do not log
response body, headers, auth, or any sensitive data. Keep the rest of the method
unchanged.
1a29b4f to
2405b0e
Compare
cce7d18 to
4d196b7
Compare
| client = self._ingest_client or self._traces_client | ||
| await client.ingest_traces(traces_ingest_request) |
There was a problem hiding this comment.
Flush in batch mode can keep using the traces client instead of the dedicated ingest client when GALILEO_INGEST_URL is set after constructing GalileoLogger. Flush selects client via client = self._ingest_client or self._traces_client and immediately calls await client.ingest_traces(...) (lines 1929-1930), but _ingest_client is only created in __init__ so it stays None; can we call _create_ingest_client() before selecting the client (as async_ingest_traces does around 2124-2130) and apply the same lazy-creation fix to other similar call sites?
Finding type: Logical Bugs | Severity: 🔴 High
Want Baz to fix this for you? Activate Fixer
Other fix methods
Prompt for AI Agents:
In src/galileo/logger/logger.py around lines 1929-1930 (method _flush_batch), the code
picks client = self._ingest_client or self._traces_client and then calls await
client.ingest_traces(...). This fails if GALILEO_INGEST_URL was set after the
GalileoLogger was constructed because _ingest_client was never created. Change the
selection to lazily create the ingest client first (e.g. if self._ingest_client is None
and os.environ.get('GALILEO_INGEST_URL'): self._ingest_client =
self._create_ingest_client()), then set client = self._ingest_client or
self._traces_client (and ensure _traces_client is created if needed). Also update the
similar client-selection logic in _ingest_span_streaming (lines ~550-593) and
_ingest_trace_streaming (lines ~516-542) to lazily create _ingest_client before using it
so the dedicated ingest service is used if the env var is present.
| @patch("galileo.traces.GalileoPythonConfig") | ||
| def test_accepts_log_stream_id(self, mock_config_class) -> None: | ||
| mock_config_class.get.return_value = Mock() | ||
|
|
||
| client = IngestTraces(project_id=PROJECT_ID, log_stream_id=LOG_STREAM_ID) |
There was a problem hiding this comment.
test_accepts_log_stream_id and other methods in TestIngestTracesInit omit the sentence-case # Given:/When:/Then: comments required by AGENTS.md, so these tests deviate from the documented testing convention and reduce clarity. Can we add non-empty sentence-case # Given:/When:/Then: comments to each test in tests/?
Finding type: AI Coding Guidelines | Severity: 🟢 Low
Want Baz to fix this for you? Activate Fixer
Other fix methods
Prompt for AI Agents:
In tests/test_ingest_traces_client.py around lines 35 to 39 (and similarly for the other
test methods in this file such as those in TestIngestTracesInit and the request test
classes), the test methods lack the required sentence-case `# Given:/When:/Then:`
comments per AGENTS.md. Edit each test function to add three short, sentence-case
comments: `# Given:` describing the test setup, `# When:` describing the action under
test, and `# Then:` describing the expected outcome (do not leave any of them empty).
Keep existing code and assertions unchanged aside from inserting these comments in the
appropriate places immediately above the relevant code blocks.
| @respx.mock | ||
| @pytest.mark.asyncio | ||
| async def test_ingest_traces_posts_to_correct_url(self, client, monkeypatch) -> None: | ||
| # Given: an ingest URL is configured | ||
| monkeypatch.setenv("GALILEO_INGEST_URL", INGEST_URL) | ||
|
|
There was a problem hiding this comment.
TestIngestTracesRequest uses @respx.mock/httpx instead of the shared mock_request fixture required by AGENTS.md (lines 207–235) and tests/conftest.py. This bypasses the shared setup (including --disable-socket) and breaks the documented mocking convention; can we refactor these tests to use mock_request and the accompanying mocks instead of wiring respx manually?
Finding type: AI Coding Guidelines | Severity: 🟢 Low
Want Baz to fix this for you? Activate Fixer
Other fix methods
Prompt for AI Agents:
In tests/test_ingest_traces_client.py around lines 148 to 153, the tests in
TestIngestTracesRequest use @respx.mock and respx.post to stub HTTP calls which violates
AGENTS.md and bypasses the shared mock_request fixture. Refactor these tests to remove
@respx.mock and respx usage: have each async test accept the mock_request fixture (add
mock_request to the parameter list), use that fixture to register the expected ingest
URL and canned httpx response (matching the JSON bodies used today), and assert route
calls via the mock_request tracking API instead of respx. Apply the same replacement to
the other tests in TestIngestTracesRequest and TestIngestSpansRequest that use respx so
all HTTP interactions reuse mock_request and respect --disable-socket.
1875ca6 to
c9eb90e
Compare
c9eb90e to
f2c95c6
Compare
Adds a new IngestTraces client that talks directly to the Go ingest service via httpx, activated when GALILEO_INGEST_URL is set. Falls back to the existing Traces client (main API) otherwise. [sc-58541]
Do not serialize trace input/output to string; preserve multimodal content. Add test_multimodal_input_not_stringified_at_trace_level.
4d196b7 to
2123eae
Compare
| trace = LoggedTrace( | ||
| input=serialize_to_str(input), | ||
| redacted_input=serialize_to_str(redacted_input) if redacted_input else None, | ||
| output=serialize_to_str(output), | ||
| redacted_output=serialize_to_str(redacted_output) if redacted_output else None, | ||
| input=input, | ||
| redacted_input=redacted_input, | ||
| output=output, | ||
| redacted_output=redacted_output, |
There was a problem hiding this comment.
add_single_llm_span_trace now constructs LoggedTrace by passing input/output/redacted_input/redacted_output directly (new lines 1034–1038), but LoggedTrace.input/output only accept str or Sequence[LoggedMessage] and the previous implementation serialized dicts/core Message via serialize_to_str. Callers that pass dicts or Message objects will fail validation before ingestion. Can we restore trace-level serialization/normalization before constructing LoggedTrace, or else narrow the API to only accept str/Sequence[LoggedMessage], and review related helpers such as add_llm_span for the same regression?
Finding type: Breaking Changes | Severity: 🔴 High
Want Baz to fix this for you? Activate Fixer
Other fix methods
Prompt for AI Agents:
In src/galileo/logger/logger.py around lines 1034 to 1038, the add_single_llm_span_trace
method now constructs LoggedTrace by passing input/output/redacted_input/redacted_output
directly, which breaks validation because LoggedTrace expects str or
Sequence[LoggedMessage]. Restore the previous trace-level serialization by calling
serialize_to_str(input) and serialize_to_str(output) (and
serialize_to_str(redacted_input)/serialize_to_str(redacted_output) when present) before
creating LoggedTrace, while keeping the full structured values for the child
LoggedLlmSpan as currently implemented. Also search nearby helper methods that used
serialize_to_str (e.g., add_llm_span/add_span) and ensure consistent normalization
behavior so callers passing dicts or Message objects continue to work.
| if self.current_parent() is not None: | ||
| raise ValueError("You must conclude the existing trace before adding a new one.") | ||
| trace = LoggedTrace( | ||
| input=input, | ||
| redacted_input=redacted_input, |
There was a problem hiding this comment.
The guard/LoggedTrace constructor here duplicates all of GalileoLogger.add_trace (src/galileo/logger/logger.py lines 425‑463) – the same current_parent() check, metrics/dataset defaults, and parent tracking exist in both classes. Any future change to the LoggedTrace contract (metadata conversion, dataset fields, parent management) will now require edits in two places. Could we share a helper (e.g. in TracesLogger or a small builder) so the hook-mode builder and the main logger reuse the same trace construction logic?
Finding type: Code Dedup and Conventions | Severity: 🟢 Low
Want Baz to fix this for you? Activate Fixer
| span = LoggedWorkflowSpan( | ||
| input=input, | ||
| redacted_input=redacted_input, | ||
| output=output, | ||
| redacted_output=redacted_output, |
There was a problem hiding this comment.
The LoggedWorkflowSpan construction here mirrors GalileoLogger.add_workflow_span (see src/galileo/logger/logger.py lines 1473‑1550) almost verbatim – same metadata conversion, metrics, UUID creation, parent wiring, and status handling. Because the hook-mode builder and the main logger both need to keep this span shape and parent manipulation in sync, can we move the shared span creation/parent tracking into a helper (or re‑use GalileoLogger._attach_parentable_span) so we don’t duplicate the same 10+ lines in two modules?
Finding type: Code Dedup and Conventions | Severity: 🟢 Low
Want Baz to fix this for you? Activate Fixer
| response = await client.post(url, json=json_body, headers=self._get_auth_headers()) | ||
| response.raise_for_status() | ||
| return response.json() | ||
|
|
There was a problem hiding this comment.
response.json() can raise JSONDecodeError for empty/non-JSON 200/204 responses; should we catch JSONDecodeError and translate it into a controlled SDK error or default payload?
Finding type: Logical Bugs | Severity: 🔴 High
Want Baz to fix this for you? Activate Fixer
Other fix methods
Prompt for AI Agents:
In src/galileo/traces.py around lines 233 to 236, the ingest_traces method calls
response.json() directly after response.raise_for_status(), which will raise
json.JSONDecodeError if the ingest service returns an empty or non-JSON body. Wrap the
call to response.json() in a try/except catching json.JSONDecodeError (import json or
JSONDecodeError), log a warning including the response.status_code and response.text,
and return a safe default payload (e.g. an empty dict) or raise a controlled
SDK-specific error instead of letting the JSONDecodeError escape. Apply the same
defensive change to the ingest_spans method at the analogous block (around lines 256 to
258) so both endpoints behave consistently.
| IngestInputType = Union[str, Sequence[LoggedMessage]] | ||
| IngestOutputType = Union[str, LoggedMessage, Sequence[Document]] | ||
|
|
||
| _INPUT_FIELD = Field(default="", description=BaseStep.model_fields["input"].description, union_mode="left_to_right") | ||
| _REDACTED_INPUT_FIELD = Field( | ||
| default=None, description=BaseStep.model_fields["redacted_input"].description, union_mode="left_to_right" | ||
| ) | ||
| _OUTPUT_FIELD = Field(default=None, description=BaseStep.model_fields["output"].description, union_mode="left_to_right") | ||
| _REDACTED_OUTPUT_FIELD = Field( |
There was a problem hiding this comment.
start_trace advertises Message input but lacks conversion to LoggedMessage — should we add a trace-level conversion helper?
Finding type: Logical Bugs | Severity: 🔴 High
Want Baz to fix this for you? Activate Fixer
Other fix methods
Prompt for AI Agents:
In src/galileo/schema/logged.py around lines 30-38, the IngestInputType and
LoggedTrace.input currently only accept str or Sequence[LoggedMessage], but callers may
pass core Message objects. Add conversion helpers and a model-level coercion so
LoggedTrace will accept core Message/dict inputs and convert them into LoggedMessage
instances before validation. Concretely: implement a classmethod on LoggedTrace similar
to LoggedLlmSpan._to_logged_message and _convert_dict_to_message, then add a Pydantic
root validator or model_post_init that walks input and redacted_input and replaces any
Message or dict entries with LoggedMessage via those helpers (also consider doing the
same for output/redacted_output if they can be Message types). This preserves the
documented start_trace behavior and prevents Pydantic validation errors.
User description
Summary
IngestTracesclient class that communicates directly with the Go ingest service viahttpx, bypassing the auto-generated API clientGALILEO_INGEST_URLis set; falls back to the existingTracesclient otherwiseGalileoLoggeringestion paths (batch flush, streaming traces, streaming spans)Changes
New:
IngestTracesclient (src/galileo/traces.py)httpx.AsyncClient-based client targetingPOST /ingest/traces/{project_id}andPOST /ingest/spans/{project_id}GALILEO_INGEST_URLenv var, falling back toapi_url@async_warn_catch_exceptionfor resilient telemetry semanticsModified:
GalileoLogger(src/galileo/logger/logger.py)_ingest_clientfield, created only whenGALILEO_INGEST_URLis setself._ingest_client or self._traces_clientfallback pattern_ensure_project_and_log_stream,_create_traces_client, and_create_ingest_clientfor lazy creationNew routes (
src/galileo/constants/routes.py)ingest_traces = "/ingest/traces/{project_id}"ingest_spans = "/ingest/spans/{project_id}"Tests (
tests/test_ingest_traces_client.py)respx-mocked HTTP tests foringest_tracesandingest_spansWhy
The Go ingest service runs at a separate URL from the main API and exposes raw HTTP endpoints (no OpenAPI client). This PR adds a thin
httpxclient so the SDK can route ingestion traffic directly to it when configured, without changing any behavior for users who don't set the env var.Test plan
test_ingest_traces_client.pypassFixes sc-58541
Made with Cursor
Generated description
Below is a concise technical summary of the changes proposed in this PR:
Describe how the SDK now targets the dedicated ingest service when
GALILEO_INGEST_URLis configured while retaining existing APIs for batch/streaming ingestion in environments without that flag. Explain how the trace/span models (and the ADK builder) now useLogged*variants plus message/content-block helpers so multimodal inputs survive ingestion and are exercised by the logging tests.IngestTracesclient when the ingest endpoint env var is set, using the added routes and client wiring for both batch and streaming ingestion while falling back to the auto-generatedTracesclient otherwise. Include the new lazy creation helpers plus streaming ingestion overrides and thetests/test_ingest_traces_client.pycoverage.Modified files (4)
Latest Contributors(2)
LoggedTrace/LoggedSpan/LoggedMessageplus theIngestContentBlockhelpers so multimodal inputs/outputs stay typed, and how the ADK trace builder plus the batch logger tests verify that nothing is stringified when flushing.Modified files (8)
Latest Contributors(2)